# -*- coding: utf-8 -*-
# =============================================================================
#         Desc: 使用mysqlbinlog命令对binlog进行分析找出
#               1、执行时间较长事务。
#               2、影响行数较多事务。
#               3、产生Binlog量较大的事务。
#       Author: GGA
#        Email:
#     HomePage:
#      Version: 1.0.0
#   LastChange: 2020-12-20
#      History:
# =============================================================================
import os
import sys
import subprocess
import logging
import re
import datetime

# mysqlbinlog文件路径
mysql_binlog_exe = "mysqlbinlog"
# 事务最大BINLOG长度，超过后被判定为大事务
max_tran_binlog_size = 1 * 1024 * 1024
# 事务最大的DML数量，超过后判定为大事务
max_tran_dml_count = 1000
# 事务最大的持续时间，超过则判定大事务
max_tran_run_seconds = 2
# 采集大事务的数量，超过该数量后停止采集
max_big_tran_count = 1000

logger = logging.getLogger()


class LoggerHelper(object):
    @classmethod
    def init_logger(cls):
        logger.setLevel(level=logging.DEBUG)
        logger_format = logging.Formatter("[%(asctime)s]-[%(levelname)s]: %(message)s")  # output format
        sh = logging.StreamHandler(stream=sys.stdout)  # output to standard output
        sh.setFormatter(logger_format)
        logger.addHandler(sh)


class TransactionInfo(object):
    DML_FLAGS = ["### INSERT INTO", "### UPDATE", "### DELETE FROM"]

    def __init__(self):
        self.start_post = 0
        self.stop_post = 0
        self.start_time = None
        self.stop_time = None
        self.dml_map = dict()

    def get_dml_count(self):
        dml_count = 0
        for key_item in self.dml_map.keys():
            key_value = self.dml_map[key_item]
            dml_count += key_value
        return dml_count

    def get_binlog_size(self):
        return self.stop_post - self.start_post

    def get_tran_seconds(self):
        if (self.stop_time is not None) and (self.start_time is not None):
            tran_seconds = (self.stop_time - self.start_time).total_seconds()
            return tran_seconds
        else:
            return 0

    def is_big_tran(self):
        if self.get_binlog_size() >= max_tran_binlog_size:
            return True
        if self.get_dml_count() >= max_tran_dml_count:
            return True
        if self.get_tran_seconds() > max_tran_run_seconds:
            return True
        return False

    def show_trans(self):
        print("=" * 30)
        print("事务开始位点:{}".format(self.start_post))
        print("事务结束位点:{}".format(self.stop_post))
        print("事务开始时间:{}".format(self.start_time.strftime("%Y-%m-%d %H:%M:%S")))
        print("事务结束时间:{}".format(self.stop_time.strftime("%Y-%m-%d %H:%M:%S")))
        print("事务BINLOG长度:{}".format(self.get_binlog_size()))
        print("事务DML次数:{}".format(self.get_dml_count()))
        print("事务持续时间:{}".format(self.get_tran_seconds()))
        for key_item in self.dml_map.keys():
            key_value = self.dml_map[key_item]
            print("{}操作次数：{}".format(key_item, key_value))

    def update_dml_map(self, binlog_line):
        dml_key = ""
        for dml_flag in self.DML_FLAGS:
            if binlog_line.find(dml_flag) >= 0:
                dml_key = binlog_line.strip()
                break
        if dml_key == "":
            return
        if dml_key in self.dml_map.keys():
            self.dml_map[dml_key] = self.dml_map[dml_key] + 1
        else:
            self.dml_map[dml_key] = 1


def get_binlog_datetime(binlog_line):
    if binlog_line.find("server id") < 0:
        return None
    start_index = 1
    end_index = binlog_line.index("server id", start_index)
    dt_str = "20" + binlog_line[start_index:end_index].strip()
    dt_tmp = datetime.datetime.strptime(dt_str, "%Y%m%d %H:%M:%S")
    return dt_tmp


def get_transaction_summary(dump_sql_path):
    logger.info("开始检查文件：{}".format(dump_sql_path))
    cmd = "{} -v --base64-output=DECODE-ROWS '{}' |" \
          "egrep '### INSERT INTO|### UPDATE|### DELETE FROM|# at|" \
          "thread_id=|Xid ='"
    cmd = cmd.format(
        mysql_binlog_exe, dump_sql_path
    )
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    tran_info = TransactionInfo()
    last_log_pos = 0
    big_trans = []
    line_num = 0
    commit_tran_time = None
    while True:
        file_bytes = p.stdout.readline()
        if not file_bytes:
            break
        file_line = file_bytes.decode().strip()
        line_num += 1
        if line_num % 10000 == 0:
            logger.info("处理记录行：{},处理位点：{},处理记录信息：{}".format(line_num, last_log_pos, file_line))
        if file_line.startswith("###   @"):
            continue
        if str(file_line).startswith("# at"):
            last_log_pos = int(str(file_line).replace("# at", "").strip())
        if str(file_line).find("Xid =") > 0:
            commit_tran_time = get_binlog_datetime(binlog_line=file_line)
        elif str(file_line).find("Query") > 0 and str(file_line).find("thread_id=") > 0:
            if commit_tran_time is None:
                continue
            tran_info.stop_post = last_log_pos
            tran_info.stop_time = commit_tran_time
            if tran_info.is_big_tran():
                big_trans.append(tran_info)
            if len(big_trans) > max_big_tran_count:
                logger.info("采集大事务数量达到上限，退出采集...")
                break
            tran_info = TransactionInfo()
            star_tran_time = get_binlog_datetime(binlog_line=file_line)
            tran_info.start_post = last_log_pos
            tran_info.start_time = star_tran_time
        else:
            tran_info.update_dml_map(file_line)
    print("=" * 30)
    print("发现{}个大事务或长事务".format(len(big_trans)))
    print("=" * 30)
    print("大事务或长事务标准：")
    print("- DML操作数量超过{}条为大事务".format(max_tran_dml_count))
    print("- Binlog长度超过{}字节为大事务".format(max_tran_binlog_size))
    print("- 事务持续时间超过{}秒为长事务".format(max_tran_run_seconds))
    for big_tran in big_trans:
        big_tran.show_trans()
    if len(big_trans) >= max_big_tran_count:
        print("采集大事务数量达到上限，请适当增加大事务阈值")
    print("检查文件完成：{}".format(dump_sql_path))


def show_usage():
    logger.info("解析mysql binlog文件分析")
    logger.info("使用语法: python binlog_transaction_check.py mysql-bin.10001")


def main(args):
    if len(args) != 2:
        logger.info("传入参数错误,请重试！")
        show_usage()
        return
    if not os.path.exists(args[1]):
        logger.info("指定的文件不存在,请重试！")
        show_usage()
        return
    get_transaction_summary(dump_sql_path=args[1])


if __name__ == '__main__':
    LoggerHelper.init_logger()
    main(sys.argv)
